#!/usr/bin/env jq
# compile-config-2.01-grounding -- Adds processes for grounding the factor graph
##

include "constants";
include "util";
include "sql";

# skip adding grounding processes unless there are variables and factors defined
if (.deepdive_.schema.variables_  | length) == 0
or (.deepdive_.inference.factors_ | length) == 0
then . else

def factorWeightDescriptionSqlExpr:
    # to serialize weight parameters describing each weight
    [ ("\(.factorName)-" | asSqlLiteral)
    , (.weight_.params[] |
        "CASE WHEN \(asSqlIdent) IS NULL THEN ''
              ELSE CAST(\(asSqlIdent) AS TEXT)
          END"
      )
    ] | join(" ||\("-" | asSqlLiteral)|| ")
    ;

.deepdive_ as $deepdive

###############################################################################

## variable/*/materialize
# Each internal variable table holding distinct variables should be
# materialized first for correct id assignment, etc.
| .deepdive_.execution.processes += merge($deepdive.schema.variables_[] | {
    "process/grounding/variable/\(.variableName)/materialize": {
        dependencies_: [
            "process/grounding/from_grounding",
            "data/\(.variablesTable)"
        ],
        style: "cmd_extractor", cmd: "

        deepdive db analyze \(.variablesTable | @sh)

        \(if .variableType == "categorical" then "
        # generate a table holding all distinct categories for categorical variables
        # so a unique id can be assigned to each of them
        deepdive create table \(.variablesCategoriesTable | @sh) as \(
        { SELECT:
            [ (.variablesCategoryColumns[]
            | { alias: "_\(.)", table: "v", column: . })
            , { alias: "count", expr: "COUNT(1)"             }
            , { alias: "cid"  , expr: "CAST(NULL AS BIGINT)" }
            ]
        , FROM: [ { alias: "v", table: .variablesTable } ]
        , GROUP_BY: [ .variablesCategoryColumns[] |         { table: "v", column: . }   ]
        } | asSql | asPrettySqlArg)
        # assign unique id to every distinct category

        deepdive db assign_sequential_id \(.variablesCategoriesTable | @sh) cid

        deepdive db analyze \(.variablesCategoriesTable | @sh)

        " else "" end)

        # generate a table holding all distinct variables identified by @key columns
        # so a unique id can be assigned to each of them
        deepdive create table \(.variablesIdsTable | @sh) as \(
        { SELECT:
            [ (.variablesKeyColumns[]
            | { alias: ., table: "v", column: . })
            , { alias: deepdiveVariableInternalLabelColumn, expr:
                # boolean variables just take one label, so aggregate with AND
                ( if .variableType == "boolean" then "EVERY(\"v\".\(.variablesLabelColumn | asSqlIdent))"
                # categorical variables should point to the category id that has a true label column
                else "MIN(CASE WHEN \"v\".\(.variablesLabelColumn | asSqlIdent) THEN \"c\".\"cid\" ELSE NULL END)"
                end) }
            , { alias: deepdiveVariableInternalFrequencyColumn, expr: "COUNT(1)" }
            , { alias: deepdiveVariableInternalPartitionColumn, expr:
                (if (.variablesParitionColumns | any) then
                    "((" +
                    ([ .variablesParitionColumns[] | "('x'||substr(coalesce(md5(\(.)::text),''),1,8))::bit(32)"]
                     | join(" # ")) + ")::BIGINT % \($deepdive.sampler.partitions))::INT"
                else "0::INT" end) }
            , { alias: deepdiveVariableIdColumn, expr: "CAST(NULL AS BIGINT)" }
            ]
        , FROM: [ { alias: "v", table: .variablesTable } ]
        , JOIN:
            ( if .variableType == "boolean" then null else
                # categorical variables need a join with the categories table
                # to find the correct internal label, etc.
                [ { INNER: { alias: "c", table: .variablesCategoriesTable }
                  , ON: { and: [ .variablesCategoryColumns[]
                               | { eq: [ { table: "c", column: "_\(.)" }
                                       , { table: "v", column: . } ]
                                 } ] }
                  } ]
            end)
        , GROUP_BY: [ .variablesKeyColumns[] | { column: . } ]
        } | asSql | asPrettySqlArg)

        deepdive db analyze \(.variablesIdsTable | @sh)
        "
    }
})


## variable_assign_id
| .deepdive_.execution.processes += {
    "process/grounding/variable_assign_id": {
        dependencies_: [
            $deepdive.schema.variables_[] | "process/grounding/variable/\(.variableName)/materialize"
        ],
        style: "cmd_extractor", cmd: "
            deepdive db assign_sequential_id \([$deepdive.schema.variables_[] | .variablesIdsTable] | join(",") | @sh) \(deepdiveVariableIdColumn | @sh) \(deepdiveVariableInternalPartitionColumn | @sh)

            \([$deepdive.schema.variables_[]
                | "deepdive db analyze \(.variablesIdsTable | @sh)"]
            | join("\n"))
        "
    }
}


## variable/*/join_ids
# Each internal variable table holding distinct variables should be
# materialized first for correct id assignment, etc.
| .deepdive_.execution.processes += merge($deepdive.schema.variables_[] | {
    "process/grounding/variable/\(.variableName)/join_ids": {
        dependencies_: [
            "process/grounding/variable_assign_id",
            "data/\(.variablesTable)"
        ],
        style: "cmd_extractor", cmd: "

        deepdive create table \(.variablesWithIdsTable | @sh) as \(
        { SELECT:
            [ (.variablesKeyColumns[] |
              { table: "v", column: . })
            , (.variablesCategoryColumns[] |
              { table: "v", column: . })
            , { table: "v", column: .variablesLabelColumn }
            , { table: "v", column: deepdiveVariableLabelTruthinessColumn }
            , { table: "ids", column: deepdiveVariableIdColumn }
            , { table: "ids", column: deepdiveVariableInternalPartitionColumn }
            ]
        , FROM: [ { alias: "v", table: .variablesTable } ]
        , JOIN: [ { INNER: { alias: "ids", table: .variablesIdsTable }
                  , ON: { and: [ .variablesKeyColumns[]
                               | { eq: [ { table: "ids", column: . }
                                       , { table: "v", column: . } ]
                                 } ] }
                } ]
        } | asSql | asPrettySqlArg)

        deepdive db analyze \(.variablesWithIdsTable | @sh)
        "
    }
})


## variable_holdout
# Variables to holdout are recorded by executing either a user-defined
# (app-wide) holdout query, or by taking a random sample of a user-defined
# fraction.
# TODO easier way to do holdout per variable?
| .deepdive_.execution.processes += {
    "process/grounding/variable_holdout": {
        dependencies_: [
            $deepdive.schema.variables_[] | "process/grounding/variable/\(.variableName)/join_ids"
        ],
        style: "cmd_extractor", cmd: "

        deepdive create table \(deepdiveGlobalHoldoutTable | @sh) \\
            variable_id:BIGINT:'PRIMARY KEY' \\
            #
        deepdive create table \(deepdiveGlobalObservationTable | @sh) \\
            variable_id:BIGINT:'PRIMARY KEY' \\
            #
        \([ if $deepdive.calibration.holdout_query then
            # run user holdout query if configured
            "\($deepdive.calibration.holdout_query)"
          else
            # otherwise, randomly select from evidence variables of each variable table
            $deepdive.schema.variables_[] | "
                INSERT INTO \(deepdiveGlobalHoldoutTable | asSqlIdent) \(
                { SELECT: [ { column: deepdiveVariableIdColumn } ]
                , FROM: [ { table: .variablesIdsTable } ]
                , WHERE:
                    [ { isntNull: { column: deepdiveVariableInternalLabelColumn } }
                    , { lt: [ { expr: "RANDOM()" }
                            , { expr: $deepdive.calibration.holdout_fraction }
                            ]
                      }
                    ]
                } | asSql);
            "
          end
        , if $deepdive.calibration.observation_query then
            # run user observation query if configured
            "\($deepdive.calibration.observation_query)"
          else empty
          end
        ] | map("deepdive sql \(asPrettySqlArg)") | join("\n"))
        "
    }
}


## variable/*/dump
# Then each variable table is dumped into a set of binary files for the inference engine.
| .deepdive_.execution.processes += merge($deepdive.schema.variables_[]
    | . as $var
    | [range($deepdive.sampler.partitions) | {pid: .} + $var][]
    | {
    "process/grounding/variable/\(.variableName)/\(.pid)/dump": {
        dependencies_: [
            "process/grounding/variable_holdout"
        ],
        style: "cmd_extractor", cmd: "
        : ${DEEPDIVE_GROUNDING_DIR:=\"$DEEPDIVE_APP\"/run/model/grounding}

        varPath=\"$DEEPDIVE_GROUNDING_DIR\"/variable/\(.variableName | @sh)
        mkdir -p \"$varPath\"
        cd \"$varPath\"
        find . -name 'variables.\(.pid).part-*.bin.bz2' -exec rm -rf {} +
        find . -name 'nvars.\(.pid).part-*' -exec rm -rf {} +
        export DEEPDIVE_LOAD_FORMAT=tsv
        export DEEPDIVE_UNLOAD_MATERIALIZED=false

        # dump the variables, joining the holdout query to determine the type of each variable
        deepdive compute execute \\
            input_sql=\(
            { SELECT:
                [ { column: "vid" }
                , { column: "variable_role" }
                , { alias: "init_value", expr:
                    "CASE WHEN variable_role = 0 THEN 0
                          ELSE (\(
                            if   .variableType == "boolean"     then "CASE WHEN label THEN 1 ELSE 0 END"
                            elif .variableType == "categorical" then "label"
                            else error("Internal error: Unknown variableType: \(.variableType)")
                            end
                            ))
                      END" }
                , { column: "variable_type" }
                , { column: "cardinality" }
                ]
            , FROM: { alias: "variables", sql:
                { SELECT:
                    [ { alias: "vid", expr: "i.\(deepdiveVariableIdColumn | asSqlIdent) & ((1::bigint << 48) - 1)" }
                    , { alias: "variable_role", expr:
                          "CASE WHEN observation.variable_id IS NOT NULL
                                 AND \"i\".\(deepdiveVariableInternalLabelColumn | asSqlIdent) IS NOT NULL THEN 2
                                WHEN holdout.variable_id IS NOT NULL THEN 0
                                WHEN \"i\".\(deepdiveVariableInternalLabelColumn | asSqlIdent) IS NOT NULL THEN 1
                                ELSE 0
                            END" }
                    , { alias: "label", table: "i", column: deepdiveVariableInternalLabelColumn }
                    , { alias: "variable_type", expr:
                            ( if .variableType == "boolean"     then 0
                            elif .variableType == "categorical" then 1
                            else error("Internal error: Unknown variableType: \(.variableType)")
                            end) }
                    , { alias: "cardinality", expr:
                            ( if .variableType == "boolean"     then 2
                            elif .variableType == "categorical" then "\"i\".\"dd__count\"" # TODO count the number of actual distinct values by .variablesCategoryColumns for this row
                            else error("Internal error: Unknown variableType: \(.variableType)")
                            end) }
                    ]
                , FROM: { alias: "i", table: .variablesIdsTable }
                , JOIN:
                    [ { LEFT_OUTER: { alias: "holdout", table: deepdiveGlobalHoldoutTable }
                      , ON: { eq: [ { table: "i", column: deepdiveVariableIdColumn }
                                  , { table: "holdout"  , column: "variable_id" } ] } }
                    , { LEFT_OUTER: { alias: "observation", table: deepdiveGlobalObservationTable }
                      , ON: { eq: [ { table: "i"  , column: deepdiveVariableIdColumn }
                                  , { table: "observation", column: "variable_id" } ] } }
                    ]
                , WHERE:
                    [ { eq: [ { table: "i", column: deepdiveVariableInternalPartitionColumn }
                            , { expr: .pid }
                            ]
                      }
                    ]
                } }
            } | asSql | asPrettySqlArg) \\
            command=\("
                sampler-dw text2bin variable /dev/stdin /dev/stdout nvars.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX} | pbzip2 >variables.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX}.bin.bz2
            " | @sh) \\
            output_relation=
        "
    }

})


## variable/*/dump_domains
# Each categorical variable dumps an extra input for the inference engine that holds the domains.
| .deepdive_.execution.processes += merge($deepdive.schema.variables_[]
    | select(.variableType == "categorical")
    | . as $var
    | [range($deepdive.sampler.partitions) | {pid: .} + $var][]
    | {
    "process/grounding/variable/\(.variableName)/\(.pid)/dump_domains": {
        dependencies_: [
            $deepdive.schema.variables_[] | "process/grounding/variable/\(.variableName)/join_ids"
        ],
        style: "cmd_extractor", cmd: "
        : ${DEEPDIVE_GROUNDING_DIR:=\"$DEEPDIVE_APP\"/run/model/grounding}
        varPath=\"$DEEPDIVE_GROUNDING_DIR\"/variable/\(.variableName | @sh)
        mkdir -p \"$varPath\"
        cd \"$varPath\"
        find . -name 'domains.\(.pid).part-*.bin.bz2' -exec rm -rf {} +
        export DEEPDIVE_LOAD_FORMAT=tsv
        export DEEPDIVE_UNLOAD_MATERIALIZED=false

        # dump the categorical variable domains, joining the categories table for their ids
        deepdive compute execute \\
        input_sql=\(
        # sparse categorical variables have domains files
        { SELECT:
            [ { alias: "vid", expr: "v.\(deepdiveVariableIdColumn | asSqlIdent) & ((1::bigint << 48) - 1)" }
            , { alias: "cardinality", expr: "COUNT(c.cid)" }
            , { alias: "cids", expr: "ARRAY_AGG(c.cid ORDER BY c.cid)" }
            , { alias: "truthiness", expr: "ARRAY_AGG(COALESCE(v.\(deepdiveVariableLabelTruthinessColumn), 0) ORDER BY c.cid)" }
            ]
        , FROM: [ { alias: "v", table: .variablesWithIdsTable } ]
        , JOIN:
            # category ids are necessary to find the inference result corresponding to the variable
            [ { INNER: { alias: "c", table: .variablesCategoriesTable }
              , ON: { and:  [ .variablesCategoryColumns[]
                            | { eq: [ { table: "c", column: "_\(.)" }
                                    , { table: "v", column: . }
                                    ] }
                            ] } }
            ]
        , WHERE:
            [ { eq: [ { table: "v", column: deepdiveVariableInternalPartitionColumn }
                    , { expr: .pid }
                    ]
              }
            ]
        , GROUP_BY: [ { table: "v", column: deepdiveVariableIdColumn } ]
        } | asSql | asPrettySqlArg) \\
        command=\("
            sampler-dw text2bin domain /dev/stdin /dev/stdout /dev/null | pbzip2 >domains.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX}.bin.bz2
        " | @sh) \\
        output_relation=
        "
    }

})


###############################################################################

## factor/*/materialize
# Each inference rule's SQL query is run to materialize the factors and the
# distinct weights used in them.
| .deepdive_.execution.processes += merge($deepdive.inference.factors_[]
    | {
    # add a process for grounding factors
    "process/grounding/factor/\(.factorName)/materialize": {
        # materializing each factor requires the dependent variables to have their id assigned
        dependencies_: [
            $deepdive.schema.variables_[] | "process/grounding/variable/\(.variableName)/join_ids"
        ],
        # other non-variable tables are also necessary
        input_: [ .input_[]
            | select(ltrimstr("data/") | in($deepdive.schema.variables_byName) | not)
        ],
        style: "cmd_extractor", cmd: "

            deepdive create view \(.factorsTable + "_view" | @sh) as \(
                { SELECT:
                    [ ( .function_.variables[] | { table: "F", column: .columnId } )
                    # TODO: assuming categorical factors' heads have categorical vars only
                    , ( .function_ | select(.isCategorical) | .variables[]
                      | .columnPrefix as $prefix
                      | .schema.variablesCategoryColumns[]
                      | { table: "F", column: "\($prefix)\(.)" } )
                    , ( .weight_.params[] | { table: "F", column: . } )
                    , { alias: "feature_value", table: "F", column: "feature_value" }
                    , { alias: "pid", expr: "(\"F\".\(.function_.variables[0].columnId | asSqlIdent) >> 48)::INT" }
                    ]
                , FROM: [
                    { alias: "F", sql: "\(.input_query)" }
                    ]
                } | asSql | asPrettySqlArg)

            case $(eval \"$(deepdive db parse)\"; echo $DBVARIANT) in
                greenplum) # specialization for Greenplum with PARTITION BY
                    deepdive create table \(.factorsTable + "_dummy" | @sh) as \("
                        SELECT * FROM \(.factorsTable + "_view" | asSqlIdent)
                        LIMIT 0
                    " | asPrettySqlArg)

                    CREATE_TABLE_EXTRA_CLAUSES='
                        PARTITION BY RANGE(pid)
                        ( START (0) INCLUSIVE
                          END (\($deepdive.sampler.partitions)) EXCLUSIVE
                          EVERY (1)
                        )' \\
                    deepdive create table \(.factorsTable | @sh) like \(
                        .factorsTable + "_dummy" | @sh)

                    deepdive sql \("
                        INSERT INTO \(.factorsTable | asSqlIdent)
                        SELECT * FROM \(.factorsTable + "_view" | asSqlIdent)
                    " | asPrettySqlArg)

                    deepdive sql \("
                        DROP TABLE \(.factorsTable + "_dummy" | asSqlIdent)
                    " | asPrettySqlArg)
                    ;;

                *) # otherwise, simply materialize the view
                    deepdive create table \(.factorsTable | @sh) as \("
                        SELECT * FROM \(.factorsTable + "_view" | asSqlIdent)
                    " | asPrettySqlArg)
            esac
            deepdive db analyze \(.factorsTable | @sh)

            # find distinct weights for the factors into a separate table
            deepdive create table \(.weightsTable | @sh) as \(
                { SELECT:
                    # weight parameters
                    [ ( .weight_.params[] | { column: . } )
                    # weight attributes
                    , { alias: "isfixed"  , expr: .weight_.is_fixed      }
                    , { alias: "initvalue", expr: .weight_.init_value    }
                    , { alias: "wid"      , expr: "CAST(NULL AS BIGINT)" } # to be assigned later by the assign_weight_id process
                    ]
                , DISTINCT: true
                , FROM:
                    [ select(.weight_.params | length > 0)
                    # when weight is parameterized, find all distinct ones
                    | { alias: "f", table: .factorsTable }
                    ]
                } | asSql | asPrettySqlArg)

            deepdive db analyze \(.weightsTable | @sh)
        "
    }
})


## assign_weight_id
# Each inference rule gets its weight ids actually assigned.
| .deepdive_.execution.processes += {
    "process/grounding/assign_weight_id": {
        dependencies_: [
            $deepdive.inference.factors_[]
            | "process/grounding/factor/\(.factorName)/materialize"
        ],
        style: "cmd_extractor", cmd: "
            deepdive db assign_sequential_id \([$deepdive.inference.factors_[] | .weightsTable] | join(",") | @sh) wid

            \([$deepdive.inference.factors_[]
                | "deepdive db analyze \(.weightsTable | @sh)"]
            | join("\n"))
        "
    }
}

## global_weight_table
# To view the weights learned by the inference engine later, set up an app-wide table.
| .deepdive_.execution.processes += {
    "process/grounding/global_weight_table": {
        dependencies_: [
            $deepdive.inference.factors_[] |
                "process/grounding/factor/\(.factorName)/materialize"
        ],
        style: "cmd_extractor", cmd: "

        # set up a union view for all weight tables (\(deepdiveGlobalWeightsTable | asSqlIdent))
        deepdive create view \(deepdiveGlobalWeightsTable | @sh) as \(
            [ $deepdive.inference.factors_[] |
                { SELECT:
                    [ { column: "wid" }
                    , { column: "isfixed" }
                    , { column: "initvalue" }
                    , { alias: "description", expr: factorWeightDescriptionSqlExpr }
                    ]
                , FROM:
                    [ { table: .weightsTable }
                    ]
                } | asSql | "(\(.))"
            ] | join("\nUNION ALL\n") | asPrettySqlArg)
        "
    }
}

## factor/*/dump
# The factors are dumped into a set of binary files for the inference engine.
| .deepdive_.execution.processes += merge($deepdive.inference.factors_[]
    | . as $fac
    | [range($deepdive.sampler.partitions) | {pid: .} + $fac][]
    | {
    # add a process for grounding factors and weights
    "process/grounding/factor/\(.factorName)/\(.pid)/dump": {
        dependencies_: [
            "process/grounding/assign_weight_id"
        ],
        style: "cmd_extractor", cmd: "
            : ${DEEPDIVE_GROUNDING_DIR:=\"$DEEPDIVE_APP\"/run/model/grounding}
            facPath=\"$DEEPDIVE_GROUNDING_DIR\"/factor/\(.factorName | @sh)
            mkdir -p \"$facPath\"
            cd \"$facPath\"
            find . \\( -name  'factors.\(.pid).part-*.bin.bz2' \\
                    -o -name 'nfactors.\(.pid).part-*'         \\
                    -o -name   'nedges.\(.pid).part-*'         \\
                   \\) -exec rm -rf {} +
            export DEEPDIVE_LOAD_FORMAT=tsv
            export DEEPDIVE_UNLOAD_MATERIALIZED=false

            # named codes used by sampler
            export \(.function_.name)Factor=\(.function_.id)  # factor function id
            export numVariablesForFactor=\(.function_.variables | length)
            export areVariablesPositive=\(.function_.variables | map(if .isNegated then "0" else "1" end) | join(" ") | @sh)

            # dump the factors joining the assigned weight ids, converting into binary format for the inference engine
            deepdive compute execute \\
                input_sql=\(
                if .function_.isCategorical then
                "\"$(
                    # categorical factor need to find exact combinations of category values and
                    # weight parameters present in the data, hence a lot of joins! but doesn't waste the weights
                    # Assuming all var tables are categorical; TODO: Support mixed head.
                    echo \(
                    { SELECT:
                        [ ( .function_.variables[]
                        | { alias: .columnId, expr: "f.\(.columnId | asSqlIdent) & ((1::bigint << 48) - 1)"} )
                        , ( .function_.variables[]
                        | { alias: "cid_\(.ordinal)", table: "C\(.ordinal)", column: "cid" } )
                        , { alias: "weight_id", table: "w", column: "wid" }
                        , { alias: "feature_value", table: "f", column: "feature_value" }
                        ]
                    , FROM:
                        [ { alias: "w", table: .weightsTable }
                        , { alias: "f", table: .factorsTable }
                        ]
                    , JOIN: [(.function_.variables[]
                        | "C\(.ordinal)" as $ctable
                        | .columnPrefix as $prefix
                        | { INNER: { alias: $ctable, table: .schema.variablesCategoriesTable }
                          , ON: { and: [ .schema.variablesCategoryColumns[]
                                 | { eq: [ { table: $ctable, column: "_\(.)" }
                                         , { table: "f", column: "\($prefix)\(.)" }
                                         ] }
                                 ] } }
                        )]
                    , WHERE:
                        [ ( .weight_.params[]
                        | { eq: [ { table: "w", column: . }
                                , { table: "f", column: . } ] } )
                        , { eq: [ { table: "f", column: "pid" }
                                , { expr: .pid }]}
                        ]
                    } | asSql | asPrettySqlArg)
                )\"" else # factors over boolean variables
                    { SELECT:
                        [ ( .function_.variables[]
                        | { expr: "f.\(.columnId | asSqlIdent) & ((1::bigint << 48) - 1)"} )
                        , { alias: "weight_id", table: "w", column: "wid" }
                        , { alias: "feature_value", table: "f", column: "feature_value" }
                        ]
                    , FROM:
                        [ { alias: "f", table: .factorsTable }
                        , { alias: "w", table: .weightsTable }
                        ]
                    , WHERE:
                        [ ( .weight_.params[]
                        | { eq: [ { table: "w", column: . }
                                , { table: "f", column: . } ] } )
                        , { eq: [ { expr: "f.\(.function_.variables[0].columnId | asSqlIdent) >> 48" }
                                , { expr: .pid }]}
                        ]
                    } | asSql | asPrettySqlArg
                end) \\
                command=\("
                    # also record the factor count
                    tee >(wc -l >nfactors.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX}) |
                    sampler-dw text2bin factor /dev/stdin /dev/stdout nedges.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX} \\
                        $\(.function_.name)Factor \\
                        $numVariablesForFactor \\
                        $areVariablesPositive \\
                        | pbzip2 >factors.\(.pid).part-${DEEPDIVE_CURRENT_PROCESS_INDEX}.bin.bz2
                " | @sh) \\
                output_relation=
        "
    }
})

## factor/*/dump_weights
# The factors and weights are dumped into a set of binary files for the inference engine.
| .deepdive_.execution.processes += merge($deepdive.inference.factors_[] | {
    # add a process for grounding factors and weights
    "process/grounding/factor/\(.factorName)/dump_weights": {
        dependencies_: [
            "process/grounding/assign_weight_id"
        ],
        style: "cmd_extractor", cmd: "
            : ${DEEPDIVE_GROUNDING_DIR:=\"$DEEPDIVE_APP\"/run/model/grounding}
            facPath=\"$DEEPDIVE_GROUNDING_DIR\"/factor/\(.factorName | @sh)
            mkdir -p \"$facPath\"
            cd \"$facPath\"
            find . \\( -name  'weights.part-*.bin.bz2' \\
                    -o -name  'nweights.part-*'        \\
                   \\) -exec rm -rf {} +
            export DEEPDIVE_LOAD_FORMAT=tsv
            export DEEPDIVE_UNLOAD_MATERIALIZED=false

            # flag that signals whether to reuse weights or not
            reuseFlag=\"$DEEPDIVE_GROUNDING_DIR\"/weights.reuse

            # dump the weights (except the description column), converting into binary format for the inference engine
            deepdive compute execute \\
                input_sql=\"$(if [[ -e \"$reuseFlag\" ]]; then
                    echo \(
                    # dump weights with initvalue from previously learned ones
                    { SELECT:
                        [ { table: "w", column: "wid" }
                        , { alias: "isfixed", expr: "CASE WHEN w.isfixed THEN 1 ELSE 0 END" }
                        , { alias: "value", expr: "COALESCE(reuse.weight, w.initvalue, 0)" }
                        ]
                    , FROM: [ { alias: "w", table: .weightsTable } ]
                    , JOIN: { LEFT_OUTER: { alias: "reuse", table: deepdiveReuseWeightsTable }
                            , ON: { eq: [ { table: "reuse", column: "description" }
                                        , { expr: factorWeightDescriptionSqlExpr }
                                        ] }
                            }
                    } | asSql | asPrettySqlArg)
                else
                    echo \(
                    # dump weights from scratch
                    { SELECT:
                        [ { column: "wid" }
                        , { alias: "isfixed", expr: "CASE WHEN isfixed THEN 1 ELSE 0 END" }
                        , { alias: "value", expr: "COALESCE(initvalue, 0)" }
                        ]
                    , FROM: [ { table: .weightsTable } ]
                    } | asSql | asPrettySqlArg)
                fi)\" \\
                command=\("
                    sampler-dw text2bin weight /dev/stdin /dev/stdout nweights.part-${DEEPDIVE_CURRENT_PROCESS_INDEX} | pbzip2 >weights.part-${DEEPDIVE_CURRENT_PROCESS_INDEX}.bin.bz2
                " | @sh) \\
                output_relation=
        "
    }
})

###############################################################################

# Finally, put together everything dumped into a layout the inference engine can easily load from
| .deepdive_.execution.processes += {
    "process/grounding/combine_factorgraph": {
        dependencies_: [(
            $deepdive.schema.variables_[]
            | . as $var
            | [range($deepdive.sampler.partitions) | {pid: .} + $var][]
            | "process/grounding/variable/\(.variableName)/\(.pid)/dump"
            , (select(.variableType == "categorical")
            | "process/grounding/variable/\(.variableName)/\(.pid)/dump_domains")
        ), (
            $deepdive.inference.factors_[]
            | . as $fac
            | [range($deepdive.sampler.partitions) | {pid: .} + $fac][]
            | "process/grounding/factor/\(.factorName)/\(.pid)/dump"
        ), (
            $deepdive.inference.factors_[]
            | "process/grounding/factor/\(.factorName)/dump_weights"
        ), (
            "process/grounding/global_weight_table"
        )],
        output_: ["model/factorgraph"],
        style: "cmd_extractor", cmd: (
            ([$deepdive.schema.variables_[] | .variableName | @sh] | join(" ")) as $variableNames |
            ([$deepdive.inference.factors_[] | .factorName  | @sh] | join(" ")) as $factorNames   |
        "
        : ${DEEPDIVE_GROUNDING_DIR:=\"$DEEPDIVE_APP\"/run/model/grounding}
        : ${DEEPDIVE_FACTORGRAPH_DIR:=\"$DEEPDIVE_APP\"/run/model/factorgraph}

        # defaults to handling all shards, which can be overridden to be a subset
        : ${DEEPDIVE_FACTORGRAPH_SHARDS:=$(seq 0 \($deepdive.sampler.partitions - 1))}

        # create a fresh empty directory for the new combined factor graph
        rm -rf   \"$DEEPDIVE_FACTORGRAPH_DIR\"
        mkdir -p \"$DEEPDIVE_FACTORGRAPH_DIR\"

        # generate the metadata for the inference engine
        for pid in $DEEPDIVE_FACTORGRAPH_SHARDS; do
            cd \"$DEEPDIVE_FACTORGRAPH_DIR\"
            mkdir -p $pid
            cd $pid
            # create symlinks to the grounded binaries by enumerating variables and factors
            for v in \($variableNames); do
                mkdir -p {variables,domains}/\"$v\"
                find \"$DEEPDIVE_GROUNDING_DIR\"/variable/\"$v\" \\
                    -name \"variables.$pid.part-*.bin.bz2\" -exec ln -sfnv -t variables/\"$v\"/ {} + \\
                 -o -name   \"domains.$pid.part-*.bin.bz2\" -exec ln -sfnv -t   domains/\"$v\"/ {} + \\
                    #
            done
            for f in \($factorNames); do
                mkdir -p {factors,weights}/\"$f\"
                find \"$DEEPDIVE_GROUNDING_DIR\"/factor/\"$f\" \\
                    -name \"factors.$pid.part-*.bin.bz2\" -exec ln -sfnv -t factors/\"$f\"/ {} + \\
                 -o -name      \"weights.part-*.bin.bz2\" -exec ln -sfnv -t weights/\"$f\"/ {} + \\
                    #
            done

            {
                # first line with counts of variables and edges in the grounded factor graph
                cd \"$DEEPDIVE_GROUNDING_DIR\"
                sumup() { { tr '\\n' +; echo 0; } | bc; }
                counts=()
                cd factor
                counts+=($(find \($factorNames) -name 'nweights.part-*' -exec cat {} + | sumup))
                cd ../variable
                counts+=($(find \($variableNames) -name \"nvars.$pid.part-*\" -exec cat {} + | sumup))
                cd ../factor
                counts+=($(find \($factorNames) -name \"nfactors.$pid.part-*\" -exec cat {} + | sumup))
                counts+=($(find \($factorNames) -name \"nedges.$pid.part-*\"   -exec cat {} + | sumup))
                (IFS=,; echo \"${counts[*]}\")
                # second line with file paths
                # FIXME second line and beyond are no longer used, remove
                paths=(\"$DEEPDIVE_FACTORGRAPH_DIR\"/{weights,variables,factors,edges,domains})
                (IFS=,; echo \"${paths[*]}\")
            } > \"$DEEPDIVE_FACTORGRAPH_DIR/$pid/meta\"
        done
        ")
    }
}

## from_grounding
# A nominal process to make it easy to redo the grounding
# TODO remove this once deepdive-do supports process groups
| .deepdive_.execution.processes += {
    "process/grounding/from_grounding": {
        style: "cmd_extractor", cmd: ": no-op"
    }
}

end
